The code below provides a skeleton for the model building & training component of your project. You can add/remove/build on code however you see fit, this is meant as a starting point.
This notebook was initialized and executed locally on a workstation equipped with an NVIDIA RTX 3080 Founders Edition GPU.
import numpy as np # Linear algebra.
import pandas as pd # Data processing, CSV file I/O (e.g. pd.read_csv).
import os
from glob import glob
%matplotlib inline
import matplotlib.pyplot as plt
import matplotlib.image as mpimg # Read png files.
# Set the plotting style of figures.
plt.style.use('ggplot')
# Plotting pretty figures and avoid blurry images.
%config InlineBackend.figure_format='retina'
## Import any other stats/DL/ML packages you may need here. E.g. Keras, scikit-learn, etc.
import math
import random
from random import sample
import csv
import sklearn.model_selection as skl
from sklearn.metrics import roc_curve, auc, precision_recall_curve, confusion_matrix
from sklearn.metrics import plot_precision_recall_curve, average_precision_score, f1_score
from sklearn.preprocessing import binarize
import scipy
from scipy.ndimage import gaussian_filter
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.applications.vgg16 import VGG16
# from tensorflow.keras.applications.vgg19 import VGG19
# from tensorflow.keras.applications.resnet import ResNet50
from tensorflow.keras.optimizers import Adam, RMSprop
from tensorflow.keras.callbacks import ModelCheckpoint, LearningRateScheduler, EarlyStopping, ReduceLROnPlateau
from tensorflow import keras
from keras.layers import GlobalAveragePooling2D, Dense, Dropout, Flatten, Conv2D, MaxPool2D, MaxPooling2D, BatchNormalization
from keras.utils.vis_utils import plot_model
RND_STATE = 42 # Random state used for reproducibility.
!nvidia-smi
import subprocess
str(subprocess.check_output(["nvidia-smi", "-L"])).count('UUID')
from __future__ import absolute_import, division, print_function, unicode_literals
executing_eagerly = tf.executing_eagerly()
print("TensorFlow version: ", tf.__version__)
print("Eager mode: ", executing_eagerly)
from tensorflow.python.client import device_lib
def get_available_gpus():
local_device_protos = device_lib.list_local_devices()
return [x.name for x in local_device_protos if x.device_type == 'GPU']
print("Are GPUs available? ", tf.config.experimental.list_physical_devices("GPU"))
print("Num of GPUs available: ", len(tf.config.list_physical_devices('GPU')))
get_available_gpus()
import warnings
warnings.filterwarnings("ignore")
data_dir = './data' # Path to data directory.
train_dir = data_dir + './train' # Path to train directory.
test_dir = data_dir + './test' # Path to test directory.
## Display a sample chest x-ray image.
plt.figure(figsize=(16,16))
plt.imshow(load_img('./data/images_001/images/00000057_003.png'))
plt.show()
## Below is some helper code to read all of your full image filepaths into a dataframe for easier manipulation.
## Load the NIH data to all_xray_df.
all_xray_df = pd.read_csv('./data/Data_Entry_2017.csv')
all_image_paths = {os.path.basename(x): x for x in
glob(os.path.join('./data', 'images*', '*', '*.png'))}
print('Scans found:', len(all_image_paths), ', Total Headers', all_xray_df.shape[0])
all_xray_df['path'] = all_xray_df['Image Index'].map(all_image_paths.get)
all_xray_df.sample(3, random_state=RND_STATE)
## Here you may want to create some extra columns in your table with binary indicators of certain diseases
## rather than working directly with the 'Finding Labels' column.
# Todo
findings = set()
for f in all_xray_df['Finding Labels'].unique():
findings.update(f.split('|'))
print(f'Total number of single diagnoses in the data: {len(findings)}')
for label in findings:
all_xray_df[label] = all_xray_df['Finding Labels'].map(lambda finding: 1. if label in finding else 0)
print(f'%s: %d'%(label, int(all_xray_df[label].sum())))
# for finding in findings:
# all_xray_df[finding] = all_xray_df['Finding Labels'].map(lambda x: 1. if finding in x else 0)
all_xray_df.head()
## Here we can create a new column called 'pneumonia_class' that will allow us to look at
## images with or without pneumonia for binary classification.
# Todo
all_xray_df['pneumonia_class'] = all_xray_df.apply(
lambda x: 'Positive' if x['Pneumonia']==1. else 'Negative', axis=1)
all_xray_df.head()
all_xray_df[all_xray_df['pneumonia_class']=='Positive']
all_xray_df[all_xray_df['pneumonia_class']=='Positive'].shape
all_xray_df[all_xray_df['pneumonia_class']=='Negative']
all_xray_df[all_xray_df['pneumonia_class']=='Negative'].shape
def create_splits(**vargs):
## Either build your own or use a built-in library to split your original dataframe into two sets
## that can be used for training and testing your model.
## It's important to consider here how balanced or imbalanced you want each of those sets to be
## for the presence of pneumonia.
# Todo
df = vargs['df']
train_data, val_data = skl.train_test_split(df, test_size = .2, stratify=df['pneumonia_class'])
train_data_ratio = len(train_data[train_data['pneumonia_class']=='Positive'])/len(train_data)
val_data_ratio = len(val_data[val_data['pneumonia_class']=='Positive'])/len(val_data)
print(f'Training set pneumonia: {100.*train_data_ratio: .2f}%\nValidation set pneumonia: {100.*val_data_ratio: .2f}%')
# Make training set contain same number of positive and negative cases (balanced).
train_pos_inds = train_data[train_data['pneumonia_class']=='Positive'].index.tolist()
train_neg_inds = train_data[train_data['pneumonia_class']=='Negative'].index.tolist()
# Randomly sample the train_neg_sample list to be of the same length as train_pos_inds.
# Use the seed function for reproducibility.
random.seed(RND_STATE)
train_neg_sample = sample(train_neg_inds, len(train_pos_inds))
train_data = train_data.loc[train_pos_inds+train_neg_sample]
train_data_ratio = len(train_data[train_data['pneumonia_class']=='Positive'])/len(train_data)
print(f'Training set corrected (50/50), Pneumonia: {100.*train_data_ratio: .2f}%')
# Make validation set contain 80% positive and 20% negative cases.
val_pos_inds = val_data[val_data['pneumonia_class']=='Positive'].index.tolist()
val_neg_inds = val_data[val_data['pneumonia_class']=='Negative'].index.tolist()
val_neg_sample = sample(val_neg_inds, 4*len(val_pos_inds))
val_data = val_data.loc[val_pos_inds+val_neg_sample]
val_data_ratio = len(val_data[val_data['pneumonia_class']=='Positive'])/len(val_data)
print(f'Validation set corrected (20/80), Pneumonia: {100.*val_data_ratio :.2f}%')
return train_data, val_data
train_data, val_data = create_splits(df=all_xray_df)
print(f'Training set size: {len(train_data)}\nValidation set size: {len(val_data)}')
(train_data['pneumonia_class']=='Positive').value_counts()
(val_data['pneumonia_class']=='Negative').value_counts()
We concur that both train_data and val_data have the correct number of positives/negative Pneumonia cases, in each set.
val_data['Patient Gender'].value_counts()
train_data distribution for changes in the Age distribution of Male patients diagnosed with Pneumonia:¶scipy.stats.ttest_ind(
all_xray_df['Patient Age'][(all_xray_df['Pneumonia']==True) & (all_xray_df['Patient Gender']=='M')],
train_data['Patient Age'][(train_data['Pneumonia']==True) & (train_data['Patient Gender']=='M')])
train_data distribution for changes in the Age distribution of Female patients diagnosed with Pneumonia:¶scipy.stats.ttest_ind(
all_xray_df['Patient Age'][(all_xray_df['Pneumonia']==True) & (all_xray_df['Patient Gender']=='F')],
train_data['Patient Age'][(train_data['Pneumonia']==True) & (train_data['Patient Gender']=='F')])
train_data['Patient Gender'].value_counts()
We concur that the training data, after sampling, revealed 57.9% male patients and 42.1% female patients.
val_data distribution for changes in the Age distribution of Male patients diagnosed with Pneumonia:¶scipy.stats.ttest_ind(
all_xray_df['Patient Age'][(all_xray_df['Pneumonia']==True) & (all_xray_df['Patient Gender']=='M')],
val_data['Patient Age'][(val_data['Pneumonia']==True) & (val_data['Patient Gender']=='M')])
val_data distribution for changes in the Age distribution of Female patients diagnosed with Pneumonia:¶scipy.stats.ttest_ind(
all_xray_df['Patient Age'][(all_xray_df['Pneumonia']== True) & (all_xray_df['Patient Gender']=='F')],
val_data['Patient Age'][(val_data['Pneumonia']==True) & (val_data['Patient Gender']=='F')])
val_data['Patient Gender'].value_counts()
We concur that the validation data, after sampling, revealed 56.64% male patients and 44.41% female patients.
Note that the TTests depicted above revealed that both Age and Gender data distributions, in both
train_dataandval_data, reflect the general data's demographic of distributions.
train_data['View Position'].value_counts()
We concur that the training data, after sampling, revealed 52.4% PAand 47.6% AP viewing positions.
val_data['View Position'].value_counts()
We concur that the validation data, after sampling, revealed 55.66% PAand 44.34% AP viewing positions.
## Hyperparameter setup.
IMG_SIZE = (224, 224) # Used for data augmentation.
TARGET_SIZE = (512, 512) # Used for data augmentation.
LR_RATE = 1e-5 # Learning rate.
LOSS = 'binary_crossentropy' # Loss function.
METRICS = [ # Accuracy metrics.
'binary_accuracy']
NUM_EPOCHS = 15 # Number of epochs.
## When it comes to "Out Of Memory" (OOM) errors when training,
# the most straightforward thing to do is to reduce the BATCH_SIZE hyperparameter.
BATCH_SIZE = 8 # Batch size.
def my_image_augmentation(**vargs):
## Recommendation here to implement a package like Keras' ImageDataGenerator
## with some of the built-in augmentations.
## Keep an eye out for types of augmentation that are or are not appropriate for medical imaging data.
## Also keep in mind what sort of augmentation is or is not appropriate for testing vs validation data.
## STAND-OUT SUGGESTION: implement some of your own custom augmentation that's *not*
## built into something like a Keras package.
# Args.
rotation = vargs['rotation'] if 'rotation' in vargs else 20
shear = vargs['shear'] if 'shear' in vargs else .15
zoom = vargs['zoom'] if 'rotation' in vargs else .12
train = vargs['train']
if train == True:
my_idg = ImageDataGenerator(
rescale=1./255.,
horizontal_flip=True,
vertical_flip=False,
height_shift_range=.1,
width_shift_range=.1,
rotation_range=rotation,
shear_range=shear,
zoom_range=zoom)
else:
my_idg = ImageDataGenerator(rescale=1./255.)
return my_idg
## Create the actual generators using the output of my_image_augmentation for your training data.
def make_train_gen(**vargs):
# Args.
df = vargs['df']
aug = my_image_augmentation(**vargs)
# Suggestion here to use the flow_from_dataframe library:
train_gen = aug.flow_from_dataframe(
dataframe=df,
directory=None,
x_col='path',
y_col='pneumonia_class',
class_mode='binary',
target_size=IMG_SIZE,
batch_size=BATCH_SIZE,
seed=RND_STATE)
return train_gen
def make_val_gen(**vargs):
# Args.
df = vargs['df']
aug = my_image_augmentation(**vargs)
val_gen = aug.flow_from_dataframe(
dataframe=df,
directory=None,
x_col='path',
y_col='pneumonia_class',
class_mode='binary',
target_size=IMG_SIZE,
batch_size=len(val_data),
seed=RND_STATE)
return val_gen
train_gen = make_train_gen(df=train_data, train=True)
val_gen = make_val_gen(df=val_data, train=False)
train_gen.class_indices
## May want to pull a single large batch of random validation data for testing after each epoch:
valX, valY = val_gen.next()
## May want to look at some examples of our augmented training data.
## This is helpful for understanding the extent to which data is being manipulated prior to training,
## and can be compared with how the raw data look prior to augmentation.
t_x, t_y = next(train_gen)
fig, m_axs = plt.subplots(2, 4, figsize=(14,8))
for (c_x, c_y, c_ax) in zip(t_x, t_y, m_axs.flatten()):
c_ax.imshow(c_x[:,:,0], cmap='bone')
if c_y == 1:
c_ax.set_title('Pneumonia')
else:
c_ax.set_title('No Pneumonia')
c_ax.axis('off')
c_ax.set_visible(True)
fig.tight_layout()
fig.savefig('./out/examples_of_augmented_training_data.png', dpi=300)
plt.show()
plt.draw()
fig, m_axs = plt.subplots(2, 4, figsize=(14,6))
for (c_x, c_y, c_ax) in zip(t_x, t_y, m_axs.flatten()):
c_ax.hist(c_x[:,:,0].flatten(), bins=256)
if c_y == 1:
c_ax.set_title('Pneumonia')
else:
c_ax.set_title('No Pneumonia')
c_ax.set_visible(True)
fig.tight_layout()
fig.savefig('./out/examples_of_augmented_training_data_hist.png', dpi=300)
plt.show()
plt.draw()
Recommendation here to use a pre-trained network downloaded from Keras for fine-tuning.
## Loads the pre-trained model.
def load_pretrained(**vargs):
# Import the pre-trained model.
pretrained_model = VGG16(include_top=True, weights='imagenet') # input_shape=(512,512,3)
transfer_layer = pretrained_model.get_layer('block5_pool')
model = Model(inputs=pretrained_model.input, outputs=transfer_layer.output)
# Trainable?
# for layer in model.layers[0:17]:
# layer.trainable = True # False
print('{} Layers:'.format(pretrained_model.name.upper()))
for layer in model.layers:
print(f'%s: Trainable? %s' % (layer.name, layer.trainable))
return model
def build(**vargs):
# Loads the pre-trained model.
pretrained_model = load_pretrained()
# Args.
end_layer_freeze_at = vargs['end_layer_freeze_at']
specific_layer = vargs['specific_layer']
fc_list = vargs['fc_list']
model_name = vargs['name']
if specific_layer == None:
for layer in pretrained_model.layers[0:end_layer_freeze_at]:
layer.trainable = False
else: # If specific_layer != False.
for ind, layer in enumerate(pretrained_model.layers):
if ind != specific_layer:
layer.trainable = False
for layer in pretrained_model.layers:
print(layer.name, layer.trainable)
# my_model = Sequential()
# ....add your pre-trained model, and then whatever additional layers you think you might
# want for fine-tuning (Flatteen, Dense, Dropout, etc.).
# if you want to compile your model within this function, consider which layers of your pre-trained model,
# you want to freeze before you compile.
# Also make sure you set your optimizer, loss function, and metrics to monitor.
model = Sequential(name=model_name)
model.add(pretrained_model)
# Flatten the output of the model because it is from a convolutional layer.
model.add(Flatten())
if len(fc_list) > 0:
for i, num in enumerate(fc_list):
model.add(Dropout(.5))
model.add(Dense(num, activation='relu'))
# Dence layer.
model.add(Dense(1, activation='sigmoid'))
# Model optimization: define the learning rate, loss, and metrics.
optimizer = Adam(lr=LR_RATE)
# Compile the model.
model.compile(optimizer=optimizer, loss=LOSS, metrics=METRICS)
return model
## STAND-OUT Suggestion: choose another output layer besides just the last classification layer of your modele
## to output class activation maps to aid in clinical interpretation of your model's results.
pretrained_model = load_pretrained()
pretrained_model.summary()
## Get the names of layers of the CNN model:
layer_names = [layer.name for layer in pretrained_model.layers]
layer_names
## Get the output of the layers of the CNN model:
layer_outputs = [layer.output for layer in pretrained_model.layers]
layer_outputs
## Generate feature maps from the CNN model and log them:
# feature_maps = pretrained_model.predict(pretrained_model.input)
# for layer_name, feature_map in zip(
# layer_names,
# feature_maps):print(f"The shape of the {layer_name} is =======>> {feature_map.shape}")
BATCH_SIZE hyperparameter was reduced to just 8 – to avoid the so called "Out of Memory" (OOM) errors witnessed during the initial training sessions of the model. GPU’s VRAM is 10 GB for reference.vgg_model_v1 = build(
name="VGG16_v1",
end_layer_freeze_at=-1,
specific_layer=None,
fc_list=[])
vgg_model_v1.summary()
## Below is some helper code that will allow you to add checkpoints to your model,
## This will save the 'best' version of your model by comparing it to previous epochs of training
## Note that you need to choose which metric to monitor for your model's 'best' performance if using this code.
## The 'patience' parameter is set to 10, meaning that your model will train for ten epochs without seeing
## improvement before quitting
# Add checkpoints to your model.
# This will save the 'best' version of your model by comparing it to previous epochs of training.
def add_checkpoints(**vargs):
# Args.
model = vargs['model']
weight_path = "./out/{}_{}.best.hdf5".format('xray_classification', model.name)
checkpoint = ModelCheckpoint(
weight_path,
monitor='val_loss',
mode='min',
verbose=1,
save_best_only=True,
save_weights_only=True)
callbacks_list = [checkpoint, EarlyStopping(monitor='val_loss', mode='min', patience=10)]
return weight_path, callbacks_list
weight_path, callbacks_list = add_checkpoints(model=vgg_model_v1)
## train your model.
history = vgg_model_v1.fit_generator(
train_gen,
validation_data=(valX, valY),
epochs=NUM_EPOCHS,
callbacks=callbacks_list)
evaluation = vgg_model_v1.evaluate(train_gen)
print(f"Train loss: {evaluation[0] * 100:.2f}%")
print(f"Train accuracy: {evaluation[1] * 100:.2f}%")
Note, these figures will come in handy for your FDA documentation later in the project.
## Log the performance statistics.
performance = []
history_df = pd.DataFrame(history.history)
performance.append(history_df[history_df['val_loss']==min(history_df['val_loss'])])
performance
## After training, make some predictions to assess your model's overall performance.
## Note that detecting pneumonia is hard even for trained expert radiologists,
## so there is no need to make the model perfect.
vgg_model_v1.load_weights(weight_path)
pred_Y = vgg_model_v1.predict(valX, batch_size=BATCH_SIZE, verbose=True)
pred_Y.shape
## Save model history (i.e., train results) to a .csv file.
def save_history_df_to_file(model):
path = "./out/{}_{}.csv".format('history_df', model.name)
history_df.to_csv(path)
return
save_history_df_to_file(vgg_model_v1)
## Hint: can use scikit-learn's built in functions here like roc_curve.
FIG_SIZE = (16, 4)
def plot_auc(model, t_y, p_y):
fpr, tpr, thresholds = roc_curve(t_y, p_y)
fig, ax = plt.subplots(1, 1, figsize=FIG_SIZE)
ax.plot(fpr, tpr, label='AUC = %0.4f' % (auc(fpr, tpr)))
ax.set_xlabel('False Positive Rate')
ax.set_ylabel('True Positive Rate')
ax.legend(loc='best', fontsize='large')
ax.set_title('{} ROC Curve'.format(model.name))
fig.savefig('./out/{}_ROC_Curve.png'.format(model.name), dpi=300)
return
## What other performance statistics do you want to include here besides AUC?
def plot_f1_scores(model, t_y, p_y):
precision, recall, thresholds = precision_recall_curve(t_y, p_y)
# Compute F1.
def f1(threshold):
for i, t in enumerate(thresholds):
if t > threshold:
f1_sc = calc_f1_score(precision[i], recall[i])
if not math.isnan(f1_sc):
return f1_sc
else:
return 0
return 0
f1_scores = [f1(t) for t in thresholds]
idx = np.argmax(np.array(f1_scores, dtype=np.float32))
fig, ax = plt.subplots(1, 1, figsize=FIG_SIZE)
ax.plot(thresholds, f1_scores, label='Threshold at MAX F1-score = %.4f' % (thresholds[idx]))
ax.set_xlabel('Threshold')
ax.set_ylabel('F1-score')
ax.legend(loc='best', fontsize='large')
ax.set_title('{} F1-scores by Threshold'.format(model.name))
fig.savefig('./out/{}_F1_scores_by_Threshold.png'.format(model.name), dpi=300)
return
def plot_prec_recall(model, t_y, p_y):
precision, recall, thresholds = precision_recall_curve(t_y, p_y)
fig, ax = plt.subplots(1, 1, figsize=FIG_SIZE)
ax.plot(recall, precision)
ax.set_xlabel('Recall')
ax.set_ylabel('Precision')
ax.set_title('{} Precision/Recall Curve'.format(model.name))
fig.savefig('./out/{}_Precision_Recall_Curve.png'.format(model.name), dpi=300)
return
def plot_prec_recall_by_threshold(model, t_y, p_y):
plt.figure(figsize=FIG_SIZE)
precision, recall, thresholds = precision_recall_curve(t_y, p_y, pos_label=1)
plt.plot(thresholds, precision[:-1], lw=2, label='Precision')
plt.plot(thresholds, recall[:-1], lw=2, label='Recall')
plt.legend(loc='best', fontsize='large')
plt.xlabel('Threshold')
plt.ylabel('Precision/Recall')
plt.title('{} Precision/Recall by Threshold'.format(model.name))
plt.savefig('./out/{}_Precision_Recall_by_Threshold.png'.format(model.name), dpi=300)
plt.show()
def plot_f1_recall_by_threshold(model, t_y, p_y):
# Compute F1.
def f1(threshold):
for i, t in enumerate(thresholds):
if t > threshold:
f1_sc = calc_f1_score(precision[i], recall[i])
if not math.isnan(f1_sc):
return f1_sc
else:
return 0
return 0
precision, recall, thresholds = precision_recall_curve(valY, pred_Y)
f1_scores = [f1(t) for t in thresholds]
plt.figure(figsize=FIG_SIZE)
plt.plot(thresholds, recall[:-1], lw=2, label='Recall')
plt.plot(thresholds, f1_scores, label='F1-score')
plt.xlabel('Threshold')
plt.ylabel('F1-score')
plt.legend(loc='best', fontsize='large')
plt.title('{} F1-scores by Threshold'.format(model.name))
plt.savefig('./out/{}_F1_scores_by_Threshold.png'.format(model.name), dpi=300)
plt.show()
## Also consider plotting the history of your model training:
def plot_acc(model, history):
x_vals = np.arange(0, len(history.history["loss"]))
metrics = ["binary_accuracy", "val_binary_accuracy"]
labels = ["train_acc", "val_acc"]
plt.figure(figsize=FIG_SIZE)
for i, metric in enumerate(metrics):
plt.plot(x_vals,
history.history[metric],
label=labels[i])
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.legend(loc="best", fontsize='large')
plt.title('{} Training Evolution (Accuracy)'.format(model.name))
plt.savefig('./out/{}_Training_Evolution_Accuracy.png'.format(model.name), dpi=300)
plt.show()
def plot_loss(model, history):
x_vals = np.arange(0, len(history.history["loss"]))
metrics = ["loss", "val_loss"]
labels = ["train_loss", "val_loss", "train_acc", "val_acc"]
plt.figure(figsize=FIG_SIZE)
for i, metric in enumerate(metrics):
plt.plot(x_vals,
history.history[metric],
label=labels[i])
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend(loc="best", fontsize='large')
plt.title('{} Training Evolution (Losses)'.format(model.name))
plt.savefig('./out/{}_Training_Evolution_Losses.png'.format(model.name), dpi=300)
plt.show()
## Plot the history of your model training.
def plot_history(model, history):
plot_acc(model, history)
plot_loss(model, history)
# Calculate the F1-score metric.
def calc_f1_score(prec, recall):
return (2. * (prec*recall) / (prec+recall))
# Obtain the threshold at maximum f1_score => best threshold.
# This function finds the threshold at maximum f1_score computed from precision and recall values.
# It attepts to log the threshold and the corresponding precision and recall values.
def calc_best_threshold(precision, recall):
f1_score = calc_f1_score(precision, recall)
# Find and save the index (idx_max) at maximum f1_score.
idx_max = np.nanargmax(f1_score)
best_threshold = thresholds[idx_max]
# Log the corresponding threshold, precision and recall values.
print(f'Threshold that maximized the F1-score: %.4f' % best_threshold)
print(f'Corresponding Precision: %.2f' % precision[idx_max])
print(f'Recall: %.2f' % recall[idx_max])
return best_threshold
## Plot all figures.
plot_history(vgg_model_v1, history)
plot_auc(vgg_model_v1, valY, pred_Y)
plot_f1_scores(vgg_model_v1, valY, pred_Y)
plot_prec_recall(vgg_model_v1, valY, pred_Y)
plot_prec_recall_by_threshold(vgg_model_v1, valY, pred_Y)
plot_f1_recall_by_threshold(vgg_model_v1, valY, pred_Y)
precision, recall, thresholds = precision_recall_curve(valY, pred_Y)
# Compute, and return the F1-score.
def f1(threshold):
for i, t in enumerate(thresholds):
if t > threshold:
f1_sc = calc_f1_score(precision[i], recall[i])
if not math.isnan(f1_sc):
return f1_sc
else:
return 0
return 0
f1_scores = [f1(t) for t in thresholds]
idx = np.argmax(np.array(f1_scores, dtype=np.float32))
f1 = f1_scores[idx]
print(f'Best/Max F1-score: {f1: .4f}\nThreshold at MAX F1-score: {thresholds[idx]: .4f}\nPecision: {precision[idx]: .4f}\nRecall: {recall[idx]: .4f}')
## Get the index at a given threshold.
def get_idx(threshold):
for idx, t in enumerate(thresholds):
if t > threshold:
return idx
print(f'Precision: {precision[get_idx(.29)]: .3f}, Recall: {recall[get_idx(.29)]: .3f}, F1-score: {f1_scores[get_idx(.29)]: .3f}, Threshold: {thresholds[get_idx(.29)]: .3f}')
print(f'Precision: {precision[get_idx(.3)]: .3f}, Recall: {recall[get_idx(.3)]: .3f}, F1-score: {f1_scores[get_idx(.3)]: .3f}, Threshold: {thresholds[get_idx(.3)]: .3f}')
print(f'Precision: {precision[get_idx(.31)]: .3f}, Recall: {recall[get_idx(.31)]: .3f}, F1-score: {f1_scores[get_idx(.31)]: .3f}, Threshold: {thresholds[get_idx(.31)]: .3f}')
print(f'Precision: {precision[get_idx(.35)]: .3f}, Recall: {recall[get_idx(.35)]: .3f}, F1-score: {f1_scores[get_idx(.35)]: .3f}, Threshold: {thresholds[get_idx(.35)]: .3f}')
def log_confusion_matrix_results():
tn, fp, fn, tp = confusion_matrix(valY, (pred_Y > .3).astype(int)).ravel()
print(f'True Negatives: {tn: .0f}')
print(f'False Positives: {fp: .0f}')
print(f'False Negatives: {fn: .0f}')
print(f'True Positives: {tp: .0f}')
log_confusion_matrix_results()
## Just save model architecture to a .json:
def save_model_architecture(model):
model_json = model.to_json()
with open("./out/{}.json".format(model.name), "w") as json_file:
json_file.write(model_json)
return
## Save model architecture.
save_model_architecture(model=vgg_model_v1)
fc_list=[1024]), consequently including one FC-layer into the model architecture since the length of the list is 1 (see the definition of build(**vargs) for details).BATCH_SIZE hyperparameter was reduced to just 8 – to avoid OOM errors witnessed during the initial training sessions of the model. GPU’s VRAM is 10 GB for reference.vgg_model_v2 = build(
name="VGG16_v2",
end_layer_freeze_at=-1,
specific_layer=None,
fc_list=[1024])
vgg_model_v2.summary()
weight_path, callbacks_list = add_checkpoints(model=vgg_model_v2)
## train your model.
history = vgg_model_v2.fit_generator(
train_gen,
validation_data=(valX, valY),
epochs=NUM_EPOCHS,
callbacks=callbacks_list)
evaluation = vgg_model_v2.evaluate(train_gen)
print(f"Train loss: {evaluation[0] * 100:.2f}%")
print(f"Train accuracy: {evaluation[1] * 100:.2f}%")
After training, wel look at the performance of this model by plotting some performance statistics.
Again, these figures will come in handy for the FDA documentation later in the project.
history_df = pd.DataFrame(history.history)
performance.append(history_df[history_df['val_loss']==min(history_df['val_loss'])])
performance
## After training, make some predictions to assess your model's overall performance.
## Note that detecting pneumonia is hard even for trained expert radiologists,
## so there is no need to make the model perfect.
vgg_model_v2.load_weights(weight_path)
pred_Y = vgg_model_v2.predict(valX, batch_size=BATCH_SIZE, verbose=True)
pred_Y.shape
save_history_df_to_file(vgg_model_v2)
## Plot all figures.
plot_history(vgg_model_v2, history)
plot_auc(vgg_model_v2, valY, pred_Y)
plot_f1_scores(vgg_model_v2, valY, pred_Y)
plot_prec_recall(vgg_model_v2, valY, pred_Y)
plot_prec_recall_by_threshold(vgg_model_v2, valY, pred_Y)
plot_f1_recall_by_threshold(vgg_model_v2, valY, pred_Y)
precision, recall, thresholds = precision_recall_curve(valY, pred_Y)
# Compute, and return the F1-score.
def f1(threshold):
for i, t in enumerate(thresholds):
if t > threshold:
f1_sc = calc_f1_score(precision[i], recall[i])
if not math.isnan(f1_sc):
return f1_sc
else:
return 0
return 0
f1_scores = [f1(t) for t in thresholds]
idx = np.argmax(np.array(f1_scores, dtype=np.float32))
f1 = f1_scores[idx]
print(f'Best/Max F1-score: {f1: .4f}\nThreshold at MAX F1-score: {thresholds[idx]: .4f}\nPecision: {precision[idx]: .4f}\nRecall: {recall[idx]: .4f}')
print(f'Precision: {precision[get_idx(.29)]: .3f}, Recall: {recall[get_idx(.29)]: .3f}, F1-score: {f1_scores[get_idx(.29)]: .3f}, Threshold: {thresholds[get_idx(.29)]: .3f}')
print(f'Precision: {precision[get_idx(.3)]: .3f}, Recall: {recall[get_idx(.3)]: .3f}, F1-score: {f1_scores[get_idx(.3)]: .3f}, Threshold: {thresholds[get_idx(.3)]: .3f}')
print(f'Precision: {precision[get_idx(.31)]: .3f}, Recall: {recall[get_idx(.31)]: .3f}, F1-score: {f1_scores[get_idx(.31)]: .3f}, Threshold: {thresholds[get_idx(.31)]: .3f}')
print(f'Precision: {precision[get_idx(.35)]: .3f}, Recall: {recall[get_idx(.35)]: .3f}, F1-score: {f1_scores[get_idx(.35)]: .3f}, Threshold: {thresholds[get_idx(.35)]: .3f}')
log_confusion_matrix_results()
## Save model architecture.
save_model_architecture(model=vgg_model_v2)
## We will find the threshold that optimize this model's performance,
## and use it to make binary classifications later (see below).\
# Precision and Recall are already obtained previously.
BEST_THRESHOLD_MODEL_2 = calc_best_threshold(precision, recall)
print(f'{vgg_model_v2.name}\'s best/max threshold: {BEST_THRESHOLD_MODEL_2: .4f}')
## Saved for later use:
pred_Y_vgg_model_v2 = pred_Y
fc_list=[1024, 512, 256, 128]), consequently including foru FC-layers into the model architecture since the length of the list is 4 (see the definition of build(**vargs) for details).BATCH_SIZE hyperparameter was reduced to just 8 – to avoid OOM errors witnessed during the initial training sessions of the model. GPU’s VRAM is 10 GB for reference.vgg_model_v3 = build(
name="VGG16_v3",
end_layer_freeze_at=-1,
specific_layer=None,
fc_list=[1024, 512, 256, 128])
vgg_model_v3.summary()
weight_path, callbacks_list = add_checkpoints(model=vgg_model_v3)
## train your model.
history = vgg_model_v3.fit_generator(
train_gen,
validation_data=(valX, valY),
epochs=NUM_EPOCHS,
callbacks=callbacks_list)
evaluation = vgg_model_v3.evaluate(train_gen)
print(f"Train loss: {evaluation[0] * 100:.2f}%")
print(f"Train accuracy: {evaluation[1] * 100:.2f}%")
After training, wel look at the performance of this model by plotting some performance statistics.
Again, these figures will come in handy for the FDA documentation later in the project.
history_df = pd.DataFrame(history.history)
performance.append(history_df[history_df['val_loss']==min(history_df['val_loss'])])
performance
## After training, make some predictions to assess your model's overall performance.
## Note that detecting pneumonia is hard even for trained expert radiologists,
## so there is no need to make the model perfect.
vgg_model_v3.load_weights(weight_path)
pred_Y = vgg_model_v3.predict(valX, batch_size=BATCH_SIZE, verbose=True)
pred_Y.shape
save_history_df_to_file(vgg_model_v3)
## Plot all figures.
plot_history(vgg_model_v3, history)
plot_auc(vgg_model_v3, valY, pred_Y)
plot_f1_scores(vgg_model_v3, valY, pred_Y)
plot_prec_recall(vgg_model_v3, valY, pred_Y)
plot_prec_recall_by_threshold(vgg_model_v3, valY, pred_Y)
plot_f1_recall_by_threshold(vgg_model_v3, valY, pred_Y)
precision, recall, thresholds = precision_recall_curve(valY, pred_Y)
# Compute, and return the F1-score.
def f1(threshold):
for i, t in enumerate(thresholds):
if t > threshold:
f1_sc = calc_f1_score(precision[i], recall[i])
if not math.isnan(f1_sc):
return f1_sc
else:
return 0
return 0
f1_scores = [f1(t) for t in thresholds]
idx = np.argmax(np.array(f1_scores, dtype=np.float32))
f1 = f1_scores[idx]
print(f'Best/Max F1-score: {f1: .4f}\nThreshold at MAX F1-score: {thresholds[idx]: .4f}\nPecision: {precision[idx]: .4f}\nRecall: {recall[idx]: .4f}')
print(f'Precision: {precision[get_idx(.29)]: .3f}, Recall: {recall[get_idx(.29)]: .3f}, F1-score: {f1_scores[get_idx(.29)]: .3f}, Threshold: {thresholds[get_idx(.29)]: .3f}')
print(f'Precision: {precision[get_idx(.3)]: .3f}, Recall: {recall[get_idx(.3)]: .3f}, F1-score: {f1_scores[get_idx(.3)]: .3f}, Threshold: {thresholds[get_idx(.3)]: .3f}')
print(f'Precision: {precision[get_idx(.31)]: .3f}, Recall: {recall[get_idx(.31)]: .3f}, F1-score: {f1_scores[get_idx(.31)]: .3f}, Threshold: {thresholds[get_idx(.31)]: .3f}')
print(f'Precision: {precision[get_idx(.35)]: .3f}, Recall: {recall[get_idx(.35)]: .3f}, F1-score: {f1_scores[get_idx(.35)]: .3f}, Threshold: {thresholds[get_idx(.35)]: .3f}')
log_confusion_matrix_results()
## Save model architecture.
save_model_architecture(model=vgg_model_v3)
## Find the threshold that optimize your model's performance,
## and use that threshold to make binary classification. Make sure you take all your metrics into consideration.
# Todo
BEST_THRESHOLD = BEST_THRESHOLD_MODEL_2
testY = valY
## Let's look at some examples of predicted vs. true with our best model:
# Todo
fig, m_axs = plt.subplots(10, 10, figsize=(16,16))
i = 0
for (c_x, c_y, c_ax) in zip(valX[0:100], testY[0:100], m_axs.flatten()):
c_ax.imshow(c_x[:,:,0], cmap='bone')
if c_y == 1:
if pred_Y[i] > BEST_THRESHOLD:
c_ax.set_title('1, 1')
else:
c_ax.set_title('1, 0')
else:
if pred_Y[i] > BEST_THRESHOLD:
c_ax.set_title('0, 1')
else:
c_ax.set_title('0, 0')
c_ax.axis('off')
i=i+1
fig.savefig('./out/examples_of_predicted_vs_true_cases_with_{}.png'.format(vgg_model_v2.name))
## Save model architecture.
save_model_architecture(model=vgg_model_v3)
val_data['predicted'] = pred_Y_vgg_model_v2
val_data['predictions'] = val_data.apply(
lambda df: 1. if df['predicted'] >= BEST_THRESHOLD else .0, axis=1)
val_data.sample(20, random_state=RND_STATE)
## Compute the positives and negatives predicted by the model.
tn, fp, fn, tp = confusion_matrix(
val_data.Pneumonia.values,
val_data.predictions.values,
labels=[0,1]).ravel()
print(f'True Negatives: {tn: .0f}')
print(f'False Positives: {fp: .0f}')
print(f'False Negatives: {fn: .0f}')
print(f'True Positives: {tp: .0f}')
## Determine pred_Y_vgg_model_v2's performance without other diseases.
sensitivity = tp / (tp+fn)
specificity = tn / (tn+fp)
print(f'Sensitivity: %.3f' % sensitivity)
print(f'Specificity: %.3f' % specificity)
## These are the top five diseases that comorbid with Pneumonia, according to the data:
for item in ['Atelectasis', 'Consolidation', 'Edema', 'Effusion', 'Infiltration']:
tn, fp, fn, tp = confusion_matrix(
val_data[val_data[item]==1].Pneumonia.values,
val_data[val_data[item]==1].predictions.values,
labels=[0,1]).ravel()
sensitivity = tp / (tp+fn)
specificity = tn / (tn+fp)
print('For {}:'.format(item.upper()))
print('--> Sensitivity: %.3f' % sensitivity)
print('--> Specificity: %.3f\n' % specificity)
For convenience we shall refer to Model 2 as the Algorithm…
- The presence of other diseases like Atelectasis, Edema, and Effusion, played an impact on the performance of the algorithm in predicting and depicting Pneumonia cases from the X-rays. This is verified by observing the rather lower performance outputs (as shown above).
- Such diseases when comorbid with Pneumonia may well have had an impact on the algorithms’ Sensitivity and Specificity (see the above corresponding results for verification).
- When trying to accurately predict Pneumonia cases, while other diseases such as the top five mentioned above are present, chances are higher for the algorithm’s specificity to be impacted, thus leading to several unwanted/undesirable False Positives in the in the results.
## Number of Pneumonia and Non-Pneumonia cases in the data set.
pos_pneumonia = all_xray_df[all_xray_df['Pneumonia']==1]
neg_pneumonia = all_xray_df[all_xray_df['Pneumonia']==0]
print('Total Pneumonia cases in the data: '+ str(len(pos_pneumonia)))
print('Total Non-Pneumonia cases in the data: {}'.format(len(neg_pneumonia)))
print('Percentage (%) of Pneumonia cases in the data: {}%'.format(
np.round(100*len(pos_pneumonia)/(len(pos_pneumonia)+len(neg_pneumonia)), 2)))
## Pneumonia omorbitiy with other diseases (plot).
plt.figure(figsize=(16,6))
pos_pneumonia['Finding Labels'].value_counts()[0:30].plot(kind='bar', color='navy')
plt.xlabel('Diseases')
plt.ylabel('Number of Cases')
plt.legend(loc='best', fontsize='large')
plt.savefig('./out/Pneumonia_Omorbitiy_with_Other_Diseases.png', dpi=300)
## We are done with this notebook!
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)